From: Jeroen van der Heijden Date: Fri, 30 Mar 2018 15:21:07 +0000 (+0200) Subject: Add no time span, and work on optimize X-Git-Tag: archive/raspbian/2.0.44-1+rpi1~1^2~3^2~9^2~46 X-Git-Url: https://dgit.raspbian.org/%22http://www.example.com/cgi/success//%22http:/www.example.com/cgi/success/?a=commitdiff_plain;h=39fecda30cee68bc01a00b6c99897a33c03db9ab;p=siridb-server.git Add no time span, and work on optimize --- diff --git a/grammar/gogrammar/grammar.go b/grammar/gogrammar/grammar.go index 562086ac..f33b4cc4 100644 --- a/grammar/gogrammar/grammar.go +++ b/grammar/gogrammar/grammar.go @@ -4,7 +4,7 @@ package grammar // should be used with the goleri module. // // Source class: SiriGrammar -// Created at: 2018-03-28 09:32:13 +// Created at: 2018-03-30 13:58:06 import ( "regexp" @@ -55,6 +55,8 @@ const ( GidFDerivative = iota GidFDifference = iota GidFFilter = iota + GidFFirst = iota + GidFLast = iota GidFLimit = iota GidFMax = iota GidFMean = iota @@ -144,6 +146,7 @@ const ( GidKFalse = iota GidKFifoFiles = iota GidKFilter = iota + GidKFirst = iota GidKFloat = iota GidKFor = iota GidKFrom = iota @@ -158,6 +161,7 @@ const ( GidKInteger = iota GidKIntersection = iota GidKIpSupport = iota + GidKLast = iota GidKLength = iota GidKLibuv = iota GidKLimit = iota @@ -257,6 +261,7 @@ const ( GidRevokeUser = iota GidSTART = iota GidSelectAggregate = iota + GidSelectAggregates = iota GidSelectStmt = iota GidSeriesColumns = iota GidSeriesMatch = iota @@ -336,6 +341,7 @@ func SiriGrammar() *goleri.Grammar { kFalse := goleri.NewKeyword(GidKFalse, "false", false) kFifoFiles := goleri.NewKeyword(GidKFifoFiles, "fifo_files", false) kFilter := goleri.NewKeyword(GidKFilter, "filter", false) + kFirst := goleri.NewKeyword(GidKFirst, "first", false) kFloat := goleri.NewKeyword(GidKFloat, "float", false) kFor := goleri.NewKeyword(GidKFor, "for", false) kFrom := goleri.NewKeyword(GidKFrom, "from", false) @@ -360,6 +366,7 @@ func SiriGrammar() *goleri.Grammar { goleri.NewKeyword(NoGid, "intersection", false), ) kIpSupport := goleri.NewKeyword(GidKIpSupport, "ip_support", false) + kLast := goleri.NewKeyword(GidKLast, "last", false) kLength := goleri.NewKeyword(GidKLength, "length", false) kLibuv := goleri.NewKeyword(GidKLibuv, "libuv", false) kLimit := goleri.NewKeyword(GidKLimit, "limit", false) @@ -1002,77 +1009,91 @@ func SiriGrammar() *goleri.Grammar { GidFMean, kMean, goleri.NewToken(NoGid, "("), - timeExpr, + goleri.NewOptional(NoGid, timeExpr), goleri.NewToken(NoGid, ")"), ) fMedian := goleri.NewSequence( GidFMedian, kMedian, goleri.NewToken(NoGid, "("), - timeExpr, + goleri.NewOptional(NoGid, timeExpr), goleri.NewToken(NoGid, ")"), ) fMedianLow := goleri.NewSequence( GidFMedianLow, kMedianLow, goleri.NewToken(NoGid, "("), - timeExpr, + goleri.NewOptional(NoGid, timeExpr), goleri.NewToken(NoGid, ")"), ) fMedianHigh := goleri.NewSequence( GidFMedianHigh, kMedianHigh, goleri.NewToken(NoGid, "("), - timeExpr, + goleri.NewOptional(NoGid, timeExpr), goleri.NewToken(NoGid, ")"), ) fSum := goleri.NewSequence( GidFSum, kSum, goleri.NewToken(NoGid, "("), - timeExpr, + goleri.NewOptional(NoGid, timeExpr), goleri.NewToken(NoGid, ")"), ) fMin := goleri.NewSequence( GidFMin, kMin, goleri.NewToken(NoGid, "("), - timeExpr, + goleri.NewOptional(NoGid, timeExpr), goleri.NewToken(NoGid, ")"), ) fMax := goleri.NewSequence( GidFMax, kMax, goleri.NewToken(NoGid, "("), - timeExpr, + goleri.NewOptional(NoGid, timeExpr), goleri.NewToken(NoGid, ")"), ) fCount := goleri.NewSequence( GidFCount, kCount, goleri.NewToken(NoGid, "("), - timeExpr, + goleri.NewOptional(NoGid, timeExpr), goleri.NewToken(NoGid, ")"), ) fVariance := goleri.NewSequence( GidFVariance, kVariance, goleri.NewToken(NoGid, "("), - timeExpr, + goleri.NewOptional(NoGid, timeExpr), goleri.NewToken(NoGid, ")"), ) fPvariance := goleri.NewSequence( GidFPvariance, kPvariance, goleri.NewToken(NoGid, "("), - timeExpr, + goleri.NewOptional(NoGid, timeExpr), goleri.NewToken(NoGid, ")"), ) fStddev := goleri.NewSequence( GidFStddev, kStddev, goleri.NewToken(NoGid, "("), - timeExpr, + goleri.NewOptional(NoGid, timeExpr), + goleri.NewToken(NoGid, ")"), + ) + fFirst := goleri.NewSequence( + GidFFirst, + kFirst, + goleri.NewToken(NoGid, "("), + goleri.NewOptional(NoGid, timeExpr), + goleri.NewToken(NoGid, ")"), + ) + fLast := goleri.NewSequence( + GidFLast, + kLast, + goleri.NewToken(NoGid, "("), + goleri.NewOptional(NoGid, timeExpr), goleri.NewToken(NoGid, ")"), ) fFilter := goleri.NewSequence( @@ -1109,6 +1130,8 @@ func SiriGrammar() *goleri.Grammar { kVariance, kPvariance, kStddev, + kFirst, + kLast, ), goleri.NewToken(NoGid, ")"), ) @@ -1128,6 +1151,8 @@ func SiriGrammar() *goleri.Grammar { fVariance, fPvariance, fStddev, + fFirst, + fLast, fDifference, fDerivative, fFilter, @@ -1138,6 +1163,7 @@ func SiriGrammar() *goleri.Grammar { goleri.NewOptional(NoGid, prefixExpr), goleri.NewOptional(NoGid, suffixExpr), ) + selectAggregates := goleri.NewList(GidSelectAggregates, selectAggregate, goleri.NewToken(NoGid, ","), 1, 0, false) mergeAs := goleri.NewSequence( GidMergeAs, kMerge, @@ -1514,7 +1540,7 @@ func SiriGrammar() *goleri.Grammar { selectStmt := goleri.NewSequence( GidSelectStmt, kSelect, - goleri.NewList(NoGid, selectAggregate, goleri.NewToken(NoGid, ","), 1, 0, false), + selectAggregates, kFrom, seriesMatch, goleri.NewOptional(NoGid, whereSeries), @@ -1567,69 +1593,32 @@ func SiriGrammar() *goleri.Grammar { ), goleri.NewToken(NoGid, ","), 0, 0, false), ) timeitStmt := goleri.NewRepeat(GidTimeitStmt, kTimeit, 1, 1) - helpDropSeries := goleri.NewKeyword(GidHelpDropSeries, "series", false) + helpShow := goleri.NewKeyword(GidHelpShow, "show", false) + helpNoaccess := goleri.NewKeyword(GidHelpNoaccess, "noaccess", false) + helpRevoke := goleri.NewKeyword(GidHelpRevoke, "revoke", false) + helpDropUser := goleri.NewKeyword(GidHelpDropUser, "user", false) helpDropGroup := goleri.NewKeyword(GidHelpDropGroup, "group", false) - helpDropShards := goleri.NewKeyword(GidHelpDropShards, "shards", false) helpDropServer := goleri.NewKeyword(GidHelpDropServer, "server", false) - helpDropUser := goleri.NewKeyword(GidHelpDropUser, "user", false) + helpDropShards := goleri.NewKeyword(GidHelpDropShards, "shards", false) + helpDropSeries := goleri.NewKeyword(GidHelpDropSeries, "series", false) helpDrop := goleri.NewSequence( GidHelpDrop, kDrop, goleri.NewOptional(NoGid, goleri.NewChoice( NoGid, true, - helpDropSeries, + helpDropUser, helpDropGroup, - helpDropShards, helpDropServer, - helpDropUser, - )), - ) - helpGrant := goleri.NewKeyword(GidHelpGrant, "grant", false) - helpSelect := goleri.NewKeyword(GidHelpSelect, "select", false) - helpListGroups := goleri.NewKeyword(GidHelpListGroups, "groups", false) - helpListSeries := goleri.NewKeyword(GidHelpListSeries, "series", false) - helpListPools := goleri.NewKeyword(GidHelpListPools, "pools", false) - helpListUsers := goleri.NewKeyword(GidHelpListUsers, "users", false) - helpListShards := goleri.NewKeyword(GidHelpListShards, "shards", false) - helpListServers := goleri.NewKeyword(GidHelpListServers, "servers", false) - helpList := goleri.NewSequence( - GidHelpList, - kList, - goleri.NewOptional(NoGid, goleri.NewChoice( - NoGid, - true, - helpListGroups, - helpListSeries, - helpListPools, - helpListUsers, - helpListShards, - helpListServers, - )), - ) - helpRevoke := goleri.NewKeyword(GidHelpRevoke, "revoke", false) - helpShow := goleri.NewKeyword(GidHelpShow, "show", false) - helpTimezones := goleri.NewKeyword(GidHelpTimezones, "timezones", false) - helpNoaccess := goleri.NewKeyword(GidHelpNoaccess, "noaccess", false) - helpAccess := goleri.NewKeyword(GidHelpAccess, "access", false) - helpFunctions := goleri.NewKeyword(GidHelpFunctions, "functions", false) - helpCreateUser := goleri.NewKeyword(GidHelpCreateUser, "user", false) - helpCreateGroup := goleri.NewKeyword(GidHelpCreateGroup, "group", false) - helpCreate := goleri.NewSequence( - GidHelpCreate, - kCreate, - goleri.NewOptional(NoGid, goleri.NewChoice( - NoGid, - true, - helpCreateUser, - helpCreateGroup, + helpDropShards, + helpDropSeries, )), ) helpAlterUser := goleri.NewKeyword(GidHelpAlterUser, "user", false) - helpAlterServer := goleri.NewKeyword(GidHelpAlterServer, "server", false) helpAlterServers := goleri.NewKeyword(GidHelpAlterServers, "servers", false) helpAlterDatabase := goleri.NewKeyword(GidHelpAlterDatabase, "database", false) helpAlterGroup := goleri.NewKeyword(GidHelpAlterGroup, "group", false) + helpAlterServer := goleri.NewKeyword(GidHelpAlterServer, "server", false) helpAlter := goleri.NewSequence( GidHelpAlter, kAlter, @@ -1637,53 +1626,90 @@ func SiriGrammar() *goleri.Grammar { NoGid, true, helpAlterUser, - helpAlterServer, helpAlterServers, helpAlterDatabase, helpAlterGroup, + helpAlterServer, )), ) - helpCountServers := goleri.NewKeyword(GidHelpCountServers, "servers", false) - helpCountShards := goleri.NewKeyword(GidHelpCountShards, "shards", false) + helpSelect := goleri.NewKeyword(GidHelpSelect, "select", false) + helpTimezones := goleri.NewKeyword(GidHelpTimezones, "timezones", false) helpCountGroups := goleri.NewKeyword(GidHelpCountGroups, "groups", false) helpCountPools := goleri.NewKeyword(GidHelpCountPools, "pools", false) helpCountSeries := goleri.NewKeyword(GidHelpCountSeries, "series", false) helpCountUsers := goleri.NewKeyword(GidHelpCountUsers, "users", false) + helpCountServers := goleri.NewKeyword(GidHelpCountServers, "servers", false) + helpCountShards := goleri.NewKeyword(GidHelpCountShards, "shards", false) helpCount := goleri.NewSequence( GidHelpCount, kCount, goleri.NewOptional(NoGid, goleri.NewChoice( NoGid, true, - helpCountServers, - helpCountShards, helpCountGroups, helpCountPools, helpCountSeries, helpCountUsers, + helpCountServers, + helpCountShards, )), ) + helpFunctions := goleri.NewKeyword(GidHelpFunctions, "functions", false) + helpCreateUser := goleri.NewKeyword(GidHelpCreateUser, "user", false) + helpCreateGroup := goleri.NewKeyword(GidHelpCreateGroup, "group", false) + helpCreate := goleri.NewSequence( + GidHelpCreate, + kCreate, + goleri.NewOptional(NoGid, goleri.NewChoice( + NoGid, + true, + helpCreateUser, + helpCreateGroup, + )), + ) + helpGrant := goleri.NewKeyword(GidHelpGrant, "grant", false) helpTimeit := goleri.NewKeyword(GidHelpTimeit, "timeit", false) + helpAccess := goleri.NewKeyword(GidHelpAccess, "access", false) + helpListSeries := goleri.NewKeyword(GidHelpListSeries, "series", false) + helpListServers := goleri.NewKeyword(GidHelpListServers, "servers", false) + helpListPools := goleri.NewKeyword(GidHelpListPools, "pools", false) + helpListUsers := goleri.NewKeyword(GidHelpListUsers, "users", false) + helpListGroups := goleri.NewKeyword(GidHelpListGroups, "groups", false) + helpListShards := goleri.NewKeyword(GidHelpListShards, "shards", false) + helpList := goleri.NewSequence( + GidHelpList, + kList, + goleri.NewOptional(NoGid, goleri.NewChoice( + NoGid, + true, + helpListSeries, + helpListServers, + helpListPools, + helpListUsers, + helpListGroups, + helpListShards, + )), + ) help := goleri.NewSequence( GidHelp, kHelp, goleri.NewOptional(NoGid, goleri.NewChoice( NoGid, true, + helpShow, + helpNoaccess, + helpRevoke, helpDrop, - helpGrant, + helpAlter, helpSelect, - helpList, - helpRevoke, - helpShow, helpTimezones, - helpNoaccess, - helpAccess, + helpCount, helpFunctions, helpCreate, - helpAlter, - helpCount, + helpGrant, helpTimeit, + helpAccess, + helpList, )), ) START := goleri.NewSequence( diff --git a/grammar/grammar.py b/grammar/grammar.py index 8e033ea7..ccfffb83 100644 --- a/grammar/grammar.py +++ b/grammar/grammar.py @@ -75,6 +75,7 @@ class SiriGrammar(Grammar): k_false = Keyword('false') k_fifo_files = Keyword('fifo_files') k_filter = Keyword('filter') + k_first = Keyword('first') k_float = Keyword('float') k_for = Keyword('for') k_from = Keyword('from') @@ -92,6 +93,7 @@ class SiriGrammar(Grammar): Keyword('intersection'), most_greedy=False) k_ip_support = Keyword('ip_support') + k_last = Keyword('last') k_length = Keyword('length') k_libuv = Keyword('libuv') k_limit = Keyword('limit') @@ -430,37 +432,43 @@ class SiriGrammar(Grammar): ')') f_mean = Sequence( k_mean, - '(', time_expr, ')') + '(', Optional(time_expr), ')') f_median = Sequence( k_median, - '(', time_expr, ')') + '(', Optional(time_expr), ')') f_median_low = Sequence( k_median_low, - '(', time_expr, ')') + '(', Optional(time_expr), ')') f_median_high = Sequence( k_median_high, - '(', time_expr, ')') + '(', Optional(time_expr), ')') f_sum = Sequence( k_sum, - '(', time_expr, ')') + '(', Optional(time_expr), ')') f_min = Sequence( k_min, - '(', time_expr, ')') + '(', Optional(time_expr), ')') f_max = Sequence( k_max, - '(', time_expr, ')') + '(', Optional(time_expr), ')') f_count = Sequence( k_count, - '(', time_expr, ')') + '(', Optional(time_expr), ')') f_variance = Sequence( k_variance, - '(', time_expr, ')') + '(', Optional(time_expr), ')') f_pvariance = Sequence( k_pvariance, - '(', time_expr, ')') + '(', Optional(time_expr), ')') f_stddev = Sequence( k_stddev, - '(', time_expr, ')') + '(', Optional(time_expr), ')') + f_first = Sequence( + k_first, + '(', Optional(time_expr), ')') + f_last = Sequence( + k_last, + '(', Optional(time_expr), ')') f_filter = Sequence( k_filter, @@ -489,6 +497,8 @@ class SiriGrammar(Grammar): k_variance, k_pvariance, k_stddev, + k_first, + k_last, most_greedy=False), ')') @@ -506,6 +516,8 @@ class SiriGrammar(Grammar): f_variance, f_pvariance, f_stddev, + f_first, + f_last, f_difference, f_derivative, f_filter, @@ -516,6 +528,8 @@ class SiriGrammar(Grammar): Optional(prefix_expr), Optional(suffix_expr)) + select_aggregates = List(select_aggregate, ',', 1) + merge_as = Sequence( k_merge, k_as, @@ -685,7 +699,7 @@ class SiriGrammar(Grammar): select_stmt = Sequence( k_select, - List(select_aggregate, ',', 1), + select_aggregates, k_from, series_match, Optional(where_series), diff --git a/include/siri/db/aggregate.h b/include/siri/db/aggregate.h index 19f8867a..c758aeda 100644 --- a/include/siri/db/aggregate.h +++ b/include/siri/db/aggregate.h @@ -40,3 +40,4 @@ siridb_points_t * siridb_aggregate_run( void siridb_init_aggregates(void); slist_t * siridb_aggregate_list(cleri_children_t * children, char * err_msg); void siridb_aggregate_list_free(slist_t * alist); +int siridb_aggregate_can_skip(cleri_children_t * children); diff --git a/include/siri/db/series.h b/include/siri/db/series.h index 80775bd9..e90c30ea 100644 --- a/include/siri/db/series.h +++ b/include/siri/db/series.h @@ -128,6 +128,11 @@ uint8_t siridb_series_server_id_by_name(const char * name); int siridb_series_open_store(siridb_t * siridb); void siridb__series_free(siridb_series_t *__restrict series); void siridb__series_decref(siridb_series_t * series); +siridb_points_t * siridb_series_get_first( + siridb_series_t * series, int * required_shard); +siridb_points_t * siridb_series_get_last( + siridb_series_t * series, int * required_shard); +siridb_points_t * siridb_series_get_count(siridb_series_t * series); /* * Increment the series reference counter. */ diff --git a/include/siri/grammar/grammar.h b/include/siri/grammar/grammar.h index 9f77604f..0a97a1ce 100644 --- a/include/siri/grammar/grammar.h +++ b/include/siri/grammar/grammar.h @@ -5,7 +5,7 @@ * should be used with the libcleri module. * * Source class: SiriGrammar - * Created at: 2018-03-28 09:32:13 + * Created at: 2018-03-30 13:58:06 */ #ifndef CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_ #define CLERI_EXPORT_SIRI_GRAMMAR_GRAMMAR_H_ @@ -55,6 +55,8 @@ enum cleri_grammar_ids { CLERI_GID_F_DERIVATIVE, CLERI_GID_F_DIFFERENCE, CLERI_GID_F_FILTER, + CLERI_GID_F_FIRST, + CLERI_GID_F_LAST, CLERI_GID_F_LIMIT, CLERI_GID_F_MAX, CLERI_GID_F_MEAN, @@ -144,6 +146,7 @@ enum cleri_grammar_ids { CLERI_GID_K_FALSE, CLERI_GID_K_FIFO_FILES, CLERI_GID_K_FILTER, + CLERI_GID_K_FIRST, CLERI_GID_K_FLOAT, CLERI_GID_K_FOR, CLERI_GID_K_FROM, @@ -158,6 +161,7 @@ enum cleri_grammar_ids { CLERI_GID_K_INTEGER, CLERI_GID_K_INTERSECTION, CLERI_GID_K_IP_SUPPORT, + CLERI_GID_K_LAST, CLERI_GID_K_LENGTH, CLERI_GID_K_LIBUV, CLERI_GID_K_LIMIT, @@ -256,6 +260,7 @@ enum cleri_grammar_ids { CLERI_GID_R_UINTEGER, CLERI_GID_R_UUID_STR, CLERI_GID_SELECT_AGGREGATE, + CLERI_GID_SELECT_AGGREGATES, CLERI_GID_SELECT_STMT, CLERI_GID_SERIES_COLUMNS, CLERI_GID_SERIES_MATCH, diff --git a/include/siri/parser/queries.h b/include/siri/parser/queries.h index bfa3e1d9..23de3313 100644 --- a/include/siri/parser/queries.h +++ b/include/siri/parser/queries.h @@ -24,6 +24,7 @@ #include #define QUERIES_IGNORE_DROP_THRESHOLD 1 +#define QUERIES_SKIP_GET_POINTS 2 enum { @@ -46,6 +47,7 @@ typedef enum #define QUERY_DEF \ uint8_t tp; \ +uint8_t flags; \ imap_t * series_map; \ imap_t * series_tmp; \ imap_t * pmap; \ @@ -89,7 +91,6 @@ typedef struct query_drop_s { QUERY_DEF size_t n; // keep a counter for number of drops. - uint8_t flags; // flags like ignore threshold slist_t * shards_list; } query_drop_t; diff --git a/src/siri/db/aggregate.c b/src/siri/db/aggregate.c index 4fcca403..fea468a1 100644 --- a/src/siri/db/aggregate.c +++ b/src/siri/db/aggregate.c @@ -71,6 +71,10 @@ static siridb_points_t * AGGREGATE_filter( siridb_points_t * source, siridb_aggr_t * aggr, char * err_msg); +static siridb_points_t * AGGREGATE_to_one( + siridb_points_t * source, + siridb_aggr_t * aggr, + char * err_msg); static siridb_points_t * AGGREGATE_group_by( siridb_points_t * source, siridb_aggr_t * aggr, @@ -153,6 +157,19 @@ static int aggr_stddev( siridb_points_t * points, siridb_aggr_t * aggr, char * err_msg); + +static int aggr_first( + siridb_point_t * point, + siridb_points_t * points, + siridb_aggr_t * aggr, + char * err_msg); + +static int aggr_last( + siridb_point_t * point, + siridb_points_t * points, + siridb_aggr_t * aggr, + char * err_msg); + /* * Initialize aggregates. */ @@ -177,6 +194,8 @@ void siridb_init_aggregates(void) AGGREGATES[CLERI_GID_F_SUM - F_OFFSET] = aggr_sum; AGGREGATES[CLERI_GID_F_VARIANCE - F_OFFSET] = aggr_variance; AGGREGATES[CLERI_GID_F_STDDEV - F_OFFSET] = aggr_stddev; + AGGREGATES[CLERI_GID_F_FIRST - F_OFFSET] = aggr_first; + AGGREGATES[CLERI_GID_F_LAST - F_OFFSET] = aggr_last; } /* @@ -267,6 +286,14 @@ slist_t * siridb_aggregate_list(cleri_children_t * children, char * err_msg) aggr->gid = CLERI_GID_F_STDDEV; break; + case CLERI_GID_K_FIRST: + aggr->gid = CLERI_GID_F_FIRST; + break; + + case CLERI_GID_K_LAST: + aggr->gid = CLERI_GID_F_LAST; + break; + default: assert (0); break; @@ -359,6 +386,19 @@ slist_t * siridb_aggregate_list(cleri_children_t * children, char * err_msg) break; case CLERI_GID_F_DIFFERENCE: + case CLERI_GID_F_COUNT: + case CLERI_GID_F_MAX: + case CLERI_GID_F_MEAN: + case CLERI_GID_F_MEDIAN: + case CLERI_GID_F_MEDIAN_HIGH: + case CLERI_GID_F_MEDIAN_LOW: + case CLERI_GID_F_MIN: + case CLERI_GID_F_PVARIANCE: + case CLERI_GID_F_SUM: + case CLERI_GID_F_VARIANCE: + case CLERI_GID_F_STDDEV: + case CLERI_GID_F_FIRST: + case CLERI_GID_F_LAST: AGGR_NEW if (children->node->children->node->children-> next->next->next != NULL) @@ -383,35 +423,6 @@ slist_t * siridb_aggregate_list(cleri_children_t * children, char * err_msg) break; - case CLERI_GID_F_COUNT: - case CLERI_GID_F_MAX: - case CLERI_GID_F_MEAN: - case CLERI_GID_F_MEDIAN: - case CLERI_GID_F_MEDIAN_HIGH: - case CLERI_GID_F_MEDIAN_LOW: - case CLERI_GID_F_MIN: - case CLERI_GID_F_PVARIANCE: - case CLERI_GID_F_SUM: - case CLERI_GID_F_VARIANCE: - case CLERI_GID_F_STDDEV: - AGGR_NEW - aggr->group_by = children->node->children->node->children-> - next->next->node->result; - - if (!aggr->group_by) - { - sprintf(err_msg, - "Group by time must be an integer value " - "larger than zero."); - AGGREGATE_free(aggr); - siridb_aggregate_list_free(slist); - return NULL; - } - - SLIST_APPEND - - break; - case CLERI_GID_F_POINTS: break; @@ -443,6 +454,24 @@ void siridb_aggregate_list_free(slist_t * alist) free(alist); } +/* + * Returns 1 (true) if at least one aggregation requires all points to be queried. + */ +int siridb_aggregate_can_skip(cleri_children_t * children) +{ + switch (children->node->children->node->cl_obj->gid) + { + case CLERI_GID_F_COUNT: + case CLERI_GID_F_FIRST: + case CLERI_GID_F_LAST: + return \ + children->node->children->node->children->next->next->next == NULL; + + default: + return 0; + } +} + /* * Return a new allocated points object or the same object as source. * In case of an error NULL is returned and an error message is set or a @@ -479,8 +508,7 @@ siridb_points_t * siridb_aggregate_run( return AGGREGATE_filter(source, aggr, err_msg); default: - assert (0); - break; + return AGGREGATE_to_one(source, aggr, err_msg); } return NULL; @@ -846,6 +874,64 @@ static siridb_points_t * AGGREGATE_filter( return points; } +static siridb_points_t * AGGREGATE_to_one( + siridb_points_t * source, + siridb_aggr_t * aggr, + char * err_msg) +{ + siridb_points_t * points; + /* get correct callback function */ + AGGR_cb aggr_cb = AGGREGATES[aggr->gid - F_OFFSET]; + + /* create new points with max possible size after re-indexing */ + switch(aggr->gid) + { + case CLERI_GID_F_MEAN: + case CLERI_GID_F_MEDIAN: + case CLERI_GID_F_PVARIANCE: + case CLERI_GID_F_VARIANCE: + case CLERI_GID_F_STDDEV: + points = siridb_points_new(1, TP_DOUBLE); + break; + case CLERI_GID_F_COUNT: + points = siridb_points_new(1, TP_INT); + break; + case CLERI_GID_F_MEDIAN_HIGH: + case CLERI_GID_F_MAX: + case CLERI_GID_F_MEDIAN_LOW: + case CLERI_GID_F_MIN: + case CLERI_GID_F_SUM: + case CLERI_GID_F_FIRST: + case CLERI_GID_F_LAST: + points = siridb_points_new(1, source->tp); + break; + default: + assert (0); + points = NULL; + } + + if (points == NULL) + { + sprintf(err_msg, "Memory allocation error."); + return NULL; /* signal is raised */ + } + + /* set time-stamp */ + points->data->ts = source->data[ + (aggr->gid == CLERI_GID_F_FIRST) ? 0 : (source->len - 1)].ts; + + /* set value */ + if (aggr_cb(points->data, source, aggr, err_msg)) + { + /* error occurred, return NULL */ + siridb_points_free(points); + return NULL; + } + + points->len++; + return points; +} + static siridb_points_t * AGGREGATE_group_by( siridb_points_t * source, siridb_aggr_t * aggr, @@ -891,6 +977,8 @@ static siridb_points_t * AGGREGATE_group_by( case CLERI_GID_F_MIN: case CLERI_GID_F_SUM: case CLERI_GID_F_DIFFERENCE: + case CLERI_GID_F_FIRST: + case CLERI_GID_F_LAST: points = siridb_points_new(max_sz, group.tp); break; default: @@ -1464,3 +1552,75 @@ static int aggr_stddev( return 0; } + +static int aggr_first( + siridb_point_t * point, + siridb_points_t * points, + siridb_aggr_t * aggr __attribute__((unused)), + char * err_msg __attribute__((unused))) +{ +#if DEBUG + assert (points->len); +#endif + siridb_point_t * source = points->data[0]; + + switch (points->tp) + { + case TP_STRING: + point->ts = source->ts; + point->val.str = strdup(source->val.str); + if (point->val.str == NULL) + { + sprintf(err_msg, "Memory allocation error."); + return -1; + } + break; + + case TP_INT: + case TP_DOUBLE: + point->val = source->val; + break; + + default: + assert (0); + break; + } + + return 0; +} + +static int aggr_last( + siridb_point_t * point, + siridb_points_t * points, + siridb_aggr_t * aggr __attribute__((unused)), + char * err_msg __attribute__((unused))) +{ +#if DEBUG + assert (points->len); +#endif + siridb_point_t * source = points->data[points->len - 1]; + + switch (points->tp) + { + case TP_STRING: + point->ts = source->ts; + point->val.str = strdup(source->val.str); + if (point->val.str == NULL) + { + sprintf(err_msg, "Memory allocation error."); + return -1; + } + break; + + case TP_INT: + case TP_DOUBLE: + point->val = source->val; + break; + + default: + assert (0); + break; + } + + return 0; +} diff --git a/src/siri/db/series.c b/src/siri/db/series.c index 8aca61cd..d2084949 100644 --- a/src/siri/db/series.c +++ b/src/siri/db/series.c @@ -792,6 +792,134 @@ void siridb__series_decref(siridb_series_t * series) } } +siridb_points_t * siridb_series_get_first( + siridb_series_t * series, int * required_shard) +{ + siridb_point_t * point; + siridb_points_t * buf = series->buffer; + siridb_points_t * points; + + *required_shard = 0; + + if (buf != NULL && + buf->len && + (points->data = buf->data[0])->ts == series->start) + { + points = siridb_points_new(1, series->tp); + if (points == NULL) + { + return NULL; + } + + /* string type does not have a buffer so we don't have to worry */ + points->len = 1; + return points; + } + *required_shard = 1; + + /* if not in the buffer, then if must be in a shard */ + assert (series->idx_len); + + idx_t * first = series->idx; + + points = siridb_points_new(first->len, series->tp); + + siridb_shard_get_points_callback(first->shard->flags, series)( + points, + first, + NULL, + series->start, + series->flags & SIRIDB_SERIES_HAS_OVERLAP); + + assert (points->len); + + while (points->len > 1) + { + --points->len; + if (points->tp == TP_STRING) + { + free((points->data + points->len)->val.str); + } + } + + return points; +} + +siridb_points_t * siridb_series_get_last( + siridb_series_t * series, int * required_shard) +{ + siridb_point_t * point; + siridb_points_t * buf = series->buffer; + siridb_points_t * points; + + *required_shard = 0; + + if (buf != NULL && + buf->len && + (points->data = buf->data[buf->len - 1])->ts == series->end) + { + points = siridb_points_new(1, series->tp); + if (points == NULL) + { + return NULL; + } + + /* string type does not have a buffer so we don't have to worry */ + points->len = 1; + return points; + } + *required_shard = 1; + + /* if not in the buffer, then if must be in a shard */ + assert (series->idx_len); + + size_t i = series->idx_len - 1; + idx_t * idx = series->idx + i; + idx_t * last = idx; + + for (; i && last->shard == (--idx)->shard; --i) + { + if (idx->end_ts > last->end_ts) + { + last = idx; + } + } + + points = siridb_points_new(last->len, series->tp); + + siridb_shard_get_points_callback(last->shard->flags, series)( + points, + last, + last->end_ts, + NULL, + series->flags & SIRIDB_SERIES_HAS_OVERLAP); + + assert (points->len); + + while (points->len > 1) + { + --points->len; + if (points->tp == TP_STRING) + { + free((points->data + points->len)->val.str); + } + } + + return points; +} + +siridb_points_t * siridb_series_get_count(siridb_series_t * series) +{ + siridb_points_t * points = siridb_points_new(1, TP_INT); + if (points != NULL) + { + points->data->ts = series->end; + points->data->val.int64 = series->length; + points->len = 1; + } + return points; +} + /* * Calculate the server id. * Returns 0 or 1, representing a server in a pool) diff --git a/src/siri/grammar/grammar.c b/src/siri/grammar/grammar.c index 8c61c861..fbd7c80a 100644 --- a/src/siri/grammar/grammar.c +++ b/src/siri/grammar/grammar.c @@ -5,7 +5,7 @@ * should be used with the libcleri module. * * Source class: SiriGrammar - * Created at: 2018-03-28 09:32:13 + * Created at: 2018-03-30 13:58:06 */ #include "siri/grammar/grammar.h" @@ -60,6 +60,7 @@ cleri_grammar_t * compile_grammar(void) cleri_t * k_false = cleri_keyword(CLERI_GID_K_FALSE, "false", CLERI_CASE_SENSITIVE); cleri_t * k_fifo_files = cleri_keyword(CLERI_GID_K_FIFO_FILES, "fifo_files", CLERI_CASE_SENSITIVE); cleri_t * k_filter = cleri_keyword(CLERI_GID_K_FILTER, "filter", CLERI_CASE_SENSITIVE); + cleri_t * k_first = cleri_keyword(CLERI_GID_K_FIRST, "first", CLERI_CASE_SENSITIVE); cleri_t * k_float = cleri_keyword(CLERI_GID_K_FLOAT, "float", CLERI_CASE_SENSITIVE); cleri_t * k_for = cleri_keyword(CLERI_GID_K_FOR, "for", CLERI_CASE_SENSITIVE); cleri_t * k_from = cleri_keyword(CLERI_GID_K_FROM, "from", CLERI_CASE_SENSITIVE); @@ -86,6 +87,7 @@ cleri_grammar_t * compile_grammar(void) cleri_keyword(CLERI_NONE, "intersection", CLERI_CASE_SENSITIVE) ); cleri_t * k_ip_support = cleri_keyword(CLERI_GID_K_IP_SUPPORT, "ip_support", CLERI_CASE_SENSITIVE); + cleri_t * k_last = cleri_keyword(CLERI_GID_K_LAST, "last", CLERI_CASE_SENSITIVE); cleri_t * k_length = cleri_keyword(CLERI_GID_K_LENGTH, "length", CLERI_CASE_SENSITIVE); cleri_t * k_libuv = cleri_keyword(CLERI_GID_K_LIBUV, "libuv", CLERI_CASE_SENSITIVE); cleri_t * k_limit = cleri_keyword(CLERI_GID_K_LIMIT, "limit", CLERI_CASE_SENSITIVE); @@ -817,7 +819,7 @@ cleri_grammar_t * compile_grammar(void) 4, k_mean, cleri_token(CLERI_NONE, "("), - time_expr, + cleri_optional(CLERI_NONE, time_expr), cleri_token(CLERI_NONE, ")") ); cleri_t * f_median = cleri_sequence( @@ -825,7 +827,7 @@ cleri_grammar_t * compile_grammar(void) 4, k_median, cleri_token(CLERI_NONE, "("), - time_expr, + cleri_optional(CLERI_NONE, time_expr), cleri_token(CLERI_NONE, ")") ); cleri_t * f_median_low = cleri_sequence( @@ -833,7 +835,7 @@ cleri_grammar_t * compile_grammar(void) 4, k_median_low, cleri_token(CLERI_NONE, "("), - time_expr, + cleri_optional(CLERI_NONE, time_expr), cleri_token(CLERI_NONE, ")") ); cleri_t * f_median_high = cleri_sequence( @@ -841,7 +843,7 @@ cleri_grammar_t * compile_grammar(void) 4, k_median_high, cleri_token(CLERI_NONE, "("), - time_expr, + cleri_optional(CLERI_NONE, time_expr), cleri_token(CLERI_NONE, ")") ); cleri_t * f_sum = cleri_sequence( @@ -849,7 +851,7 @@ cleri_grammar_t * compile_grammar(void) 4, k_sum, cleri_token(CLERI_NONE, "("), - time_expr, + cleri_optional(CLERI_NONE, time_expr), cleri_token(CLERI_NONE, ")") ); cleri_t * f_min = cleri_sequence( @@ -857,7 +859,7 @@ cleri_grammar_t * compile_grammar(void) 4, k_min, cleri_token(CLERI_NONE, "("), - time_expr, + cleri_optional(CLERI_NONE, time_expr), cleri_token(CLERI_NONE, ")") ); cleri_t * f_max = cleri_sequence( @@ -865,7 +867,7 @@ cleri_grammar_t * compile_grammar(void) 4, k_max, cleri_token(CLERI_NONE, "("), - time_expr, + cleri_optional(CLERI_NONE, time_expr), cleri_token(CLERI_NONE, ")") ); cleri_t * f_count = cleri_sequence( @@ -873,7 +875,7 @@ cleri_grammar_t * compile_grammar(void) 4, k_count, cleri_token(CLERI_NONE, "("), - time_expr, + cleri_optional(CLERI_NONE, time_expr), cleri_token(CLERI_NONE, ")") ); cleri_t * f_variance = cleri_sequence( @@ -881,7 +883,7 @@ cleri_grammar_t * compile_grammar(void) 4, k_variance, cleri_token(CLERI_NONE, "("), - time_expr, + cleri_optional(CLERI_NONE, time_expr), cleri_token(CLERI_NONE, ")") ); cleri_t * f_pvariance = cleri_sequence( @@ -889,7 +891,7 @@ cleri_grammar_t * compile_grammar(void) 4, k_pvariance, cleri_token(CLERI_NONE, "("), - time_expr, + cleri_optional(CLERI_NONE, time_expr), cleri_token(CLERI_NONE, ")") ); cleri_t * f_stddev = cleri_sequence( @@ -897,7 +899,23 @@ cleri_grammar_t * compile_grammar(void) 4, k_stddev, cleri_token(CLERI_NONE, "("), - time_expr, + cleri_optional(CLERI_NONE, time_expr), + cleri_token(CLERI_NONE, ")") + ); + cleri_t * f_first = cleri_sequence( + CLERI_GID_F_FIRST, + 4, + k_first, + cleri_token(CLERI_NONE, "("), + cleri_optional(CLERI_NONE, time_expr), + cleri_token(CLERI_NONE, ")") + ); + cleri_t * f_last = cleri_sequence( + CLERI_GID_F_LAST, + 4, + k_last, + cleri_token(CLERI_NONE, "("), + cleri_optional(CLERI_NONE, time_expr), cleri_token(CLERI_NONE, ")") ); cleri_t * f_filter = cleri_sequence( @@ -926,7 +944,7 @@ cleri_grammar_t * compile_grammar(void) cleri_choice( CLERI_NONE, CLERI_FIRST_MATCH, - 11, + 13, k_mean, k_median, k_median_high, @@ -937,14 +955,16 @@ cleri_grammar_t * compile_grammar(void) k_count, k_variance, k_pvariance, - k_stddev + k_stddev, + k_first, + k_last ), cleri_token(CLERI_NONE, ")") ); cleri_t * aggregate_functions = cleri_list(CLERI_GID_AGGREGATE_FUNCTIONS, cleri_choice( CLERI_NONE, CLERI_FIRST_MATCH, - 16, + 18, f_points, f_limit, f_mean, @@ -958,6 +978,8 @@ cleri_grammar_t * compile_grammar(void) f_variance, f_pvariance, f_stddev, + f_first, + f_last, f_difference, f_derivative, f_filter @@ -969,6 +991,7 @@ cleri_grammar_t * compile_grammar(void) cleri_optional(CLERI_NONE, prefix_expr), cleri_optional(CLERI_NONE, suffix_expr) ); + cleri_t * select_aggregates = cleri_list(CLERI_GID_SELECT_AGGREGATES, select_aggregate, cleri_token(CLERI_NONE, ","), 1, 0, 0); cleri_t * merge_as = cleri_sequence( CLERI_GID_MERGE_AS, 4, @@ -1408,7 +1431,7 @@ cleri_grammar_t * compile_grammar(void) CLERI_GID_SELECT_STMT, 7, k_select, - cleri_list(CLERI_NONE, select_aggregate, cleri_token(CLERI_NONE, ","), 1, 0, 0), + select_aggregates, k_from, series_match, cleri_optional(CLERI_NONE, where_series), @@ -1464,11 +1487,14 @@ cleri_grammar_t * compile_grammar(void) ), cleri_token(CLERI_NONE, ","), 0, 0, 0) ); cleri_t * timeit_stmt = cleri_dup(CLERI_GID_TIMEIT_STMT, k_timeit); - cleri_t * help_drop_series = cleri_keyword(CLERI_GID_HELP_DROP_SERIES, "series", CLERI_CASE_SENSITIVE); + cleri_t * help_show = cleri_keyword(CLERI_GID_HELP_SHOW, "show", CLERI_CASE_SENSITIVE); + cleri_t * help_noaccess = cleri_keyword(CLERI_GID_HELP_NOACCESS, "noaccess", CLERI_CASE_SENSITIVE); + cleri_t * help_revoke = cleri_keyword(CLERI_GID_HELP_REVOKE, "revoke", CLERI_CASE_SENSITIVE); + cleri_t * help_drop_user = cleri_keyword(CLERI_GID_HELP_DROP_USER, "user", CLERI_CASE_SENSITIVE); cleri_t * help_drop_group = cleri_keyword(CLERI_GID_HELP_DROP_GROUP, "group", CLERI_CASE_SENSITIVE); - cleri_t * help_drop_shards = cleri_keyword(CLERI_GID_HELP_DROP_SHARDS, "shards", CLERI_CASE_SENSITIVE); cleri_t * help_drop_server = cleri_keyword(CLERI_GID_HELP_DROP_SERVER, "server", CLERI_CASE_SENSITIVE); - cleri_t * help_drop_user = cleri_keyword(CLERI_GID_HELP_DROP_USER, "user", CLERI_CASE_SENSITIVE); + cleri_t * help_drop_shards = cleri_keyword(CLERI_GID_HELP_DROP_SHARDS, "shards", CLERI_CASE_SENSITIVE); + cleri_t * help_drop_series = cleri_keyword(CLERI_GID_HELP_DROP_SERIES, "series", CLERI_CASE_SENSITIVE); cleri_t * help_drop = cleri_sequence( CLERI_GID_HELP_DROP, 2, @@ -1477,62 +1503,18 @@ cleri_grammar_t * compile_grammar(void) CLERI_NONE, CLERI_MOST_GREEDY, 5, - help_drop_series, + help_drop_user, help_drop_group, - help_drop_shards, help_drop_server, - help_drop_user - )) - ); - cleri_t * help_grant = cleri_keyword(CLERI_GID_HELP_GRANT, "grant", CLERI_CASE_SENSITIVE); - cleri_t * help_select = cleri_keyword(CLERI_GID_HELP_SELECT, "select", CLERI_CASE_SENSITIVE); - cleri_t * help_list_groups = cleri_keyword(CLERI_GID_HELP_LIST_GROUPS, "groups", CLERI_CASE_SENSITIVE); - cleri_t * help_list_series = cleri_keyword(CLERI_GID_HELP_LIST_SERIES, "series", CLERI_CASE_SENSITIVE); - cleri_t * help_list_pools = cleri_keyword(CLERI_GID_HELP_LIST_POOLS, "pools", CLERI_CASE_SENSITIVE); - cleri_t * help_list_users = cleri_keyword(CLERI_GID_HELP_LIST_USERS, "users", CLERI_CASE_SENSITIVE); - cleri_t * help_list_shards = cleri_keyword(CLERI_GID_HELP_LIST_SHARDS, "shards", CLERI_CASE_SENSITIVE); - cleri_t * help_list_servers = cleri_keyword(CLERI_GID_HELP_LIST_SERVERS, "servers", CLERI_CASE_SENSITIVE); - cleri_t * help_list = cleri_sequence( - CLERI_GID_HELP_LIST, - 2, - k_list, - cleri_optional(CLERI_NONE, cleri_choice( - CLERI_NONE, - CLERI_MOST_GREEDY, - 6, - help_list_groups, - help_list_series, - help_list_pools, - help_list_users, - help_list_shards, - help_list_servers - )) - ); - cleri_t * help_revoke = cleri_keyword(CLERI_GID_HELP_REVOKE, "revoke", CLERI_CASE_SENSITIVE); - cleri_t * help_show = cleri_keyword(CLERI_GID_HELP_SHOW, "show", CLERI_CASE_SENSITIVE); - cleri_t * help_timezones = cleri_keyword(CLERI_GID_HELP_TIMEZONES, "timezones", CLERI_CASE_SENSITIVE); - cleri_t * help_noaccess = cleri_keyword(CLERI_GID_HELP_NOACCESS, "noaccess", CLERI_CASE_SENSITIVE); - cleri_t * help_access = cleri_keyword(CLERI_GID_HELP_ACCESS, "access", CLERI_CASE_SENSITIVE); - cleri_t * help_functions = cleri_keyword(CLERI_GID_HELP_FUNCTIONS, "functions", CLERI_CASE_SENSITIVE); - cleri_t * help_create_user = cleri_keyword(CLERI_GID_HELP_CREATE_USER, "user", CLERI_CASE_SENSITIVE); - cleri_t * help_create_group = cleri_keyword(CLERI_GID_HELP_CREATE_GROUP, "group", CLERI_CASE_SENSITIVE); - cleri_t * help_create = cleri_sequence( - CLERI_GID_HELP_CREATE, - 2, - k_create, - cleri_optional(CLERI_NONE, cleri_choice( - CLERI_NONE, - CLERI_MOST_GREEDY, - 2, - help_create_user, - help_create_group + help_drop_shards, + help_drop_series )) ); cleri_t * help_alter_user = cleri_keyword(CLERI_GID_HELP_ALTER_USER, "user", CLERI_CASE_SENSITIVE); - cleri_t * help_alter_server = cleri_keyword(CLERI_GID_HELP_ALTER_SERVER, "server", CLERI_CASE_SENSITIVE); cleri_t * help_alter_servers = cleri_keyword(CLERI_GID_HELP_ALTER_SERVERS, "servers", CLERI_CASE_SENSITIVE); cleri_t * help_alter_database = cleri_keyword(CLERI_GID_HELP_ALTER_DATABASE, "database", CLERI_CASE_SENSITIVE); cleri_t * help_alter_group = cleri_keyword(CLERI_GID_HELP_ALTER_GROUP, "group", CLERI_CASE_SENSITIVE); + cleri_t * help_alter_server = cleri_keyword(CLERI_GID_HELP_ALTER_SERVER, "server", CLERI_CASE_SENSITIVE); cleri_t * help_alter = cleri_sequence( CLERI_GID_HELP_ALTER, 2, @@ -1542,18 +1524,20 @@ cleri_grammar_t * compile_grammar(void) CLERI_MOST_GREEDY, 5, help_alter_user, - help_alter_server, help_alter_servers, help_alter_database, - help_alter_group + help_alter_group, + help_alter_server )) ); - cleri_t * help_count_servers = cleri_keyword(CLERI_GID_HELP_COUNT_SERVERS, "servers", CLERI_CASE_SENSITIVE); - cleri_t * help_count_shards = cleri_keyword(CLERI_GID_HELP_COUNT_SHARDS, "shards", CLERI_CASE_SENSITIVE); + cleri_t * help_select = cleri_keyword(CLERI_GID_HELP_SELECT, "select", CLERI_CASE_SENSITIVE); + cleri_t * help_timezones = cleri_keyword(CLERI_GID_HELP_TIMEZONES, "timezones", CLERI_CASE_SENSITIVE); cleri_t * help_count_groups = cleri_keyword(CLERI_GID_HELP_COUNT_GROUPS, "groups", CLERI_CASE_SENSITIVE); cleri_t * help_count_pools = cleri_keyword(CLERI_GID_HELP_COUNT_POOLS, "pools", CLERI_CASE_SENSITIVE); cleri_t * help_count_series = cleri_keyword(CLERI_GID_HELP_COUNT_SERIES, "series", CLERI_CASE_SENSITIVE); cleri_t * help_count_users = cleri_keyword(CLERI_GID_HELP_COUNT_USERS, "users", CLERI_CASE_SENSITIVE); + cleri_t * help_count_servers = cleri_keyword(CLERI_GID_HELP_COUNT_SERVERS, "servers", CLERI_CASE_SENSITIVE); + cleri_t * help_count_shards = cleri_keyword(CLERI_GID_HELP_COUNT_SHARDS, "shards", CLERI_CASE_SENSITIVE); cleri_t * help_count = cleri_sequence( CLERI_GID_HELP_COUNT, 2, @@ -1562,15 +1546,54 @@ cleri_grammar_t * compile_grammar(void) CLERI_NONE, CLERI_MOST_GREEDY, 6, - help_count_servers, - help_count_shards, help_count_groups, help_count_pools, help_count_series, - help_count_users + help_count_users, + help_count_servers, + help_count_shards + )) + ); + cleri_t * help_functions = cleri_keyword(CLERI_GID_HELP_FUNCTIONS, "functions", CLERI_CASE_SENSITIVE); + cleri_t * help_create_user = cleri_keyword(CLERI_GID_HELP_CREATE_USER, "user", CLERI_CASE_SENSITIVE); + cleri_t * help_create_group = cleri_keyword(CLERI_GID_HELP_CREATE_GROUP, "group", CLERI_CASE_SENSITIVE); + cleri_t * help_create = cleri_sequence( + CLERI_GID_HELP_CREATE, + 2, + k_create, + cleri_optional(CLERI_NONE, cleri_choice( + CLERI_NONE, + CLERI_MOST_GREEDY, + 2, + help_create_user, + help_create_group )) ); + cleri_t * help_grant = cleri_keyword(CLERI_GID_HELP_GRANT, "grant", CLERI_CASE_SENSITIVE); cleri_t * help_timeit = cleri_keyword(CLERI_GID_HELP_TIMEIT, "timeit", CLERI_CASE_SENSITIVE); + cleri_t * help_access = cleri_keyword(CLERI_GID_HELP_ACCESS, "access", CLERI_CASE_SENSITIVE); + cleri_t * help_list_series = cleri_keyword(CLERI_GID_HELP_LIST_SERIES, "series", CLERI_CASE_SENSITIVE); + cleri_t * help_list_servers = cleri_keyword(CLERI_GID_HELP_LIST_SERVERS, "servers", CLERI_CASE_SENSITIVE); + cleri_t * help_list_pools = cleri_keyword(CLERI_GID_HELP_LIST_POOLS, "pools", CLERI_CASE_SENSITIVE); + cleri_t * help_list_users = cleri_keyword(CLERI_GID_HELP_LIST_USERS, "users", CLERI_CASE_SENSITIVE); + cleri_t * help_list_groups = cleri_keyword(CLERI_GID_HELP_LIST_GROUPS, "groups", CLERI_CASE_SENSITIVE); + cleri_t * help_list_shards = cleri_keyword(CLERI_GID_HELP_LIST_SHARDS, "shards", CLERI_CASE_SENSITIVE); + cleri_t * help_list = cleri_sequence( + CLERI_GID_HELP_LIST, + 2, + k_list, + cleri_optional(CLERI_NONE, cleri_choice( + CLERI_NONE, + CLERI_MOST_GREEDY, + 6, + help_list_series, + help_list_servers, + help_list_pools, + help_list_users, + help_list_groups, + help_list_shards + )) + ); cleri_t * help = cleri_sequence( CLERI_GID_HELP, 2, @@ -1579,20 +1602,20 @@ cleri_grammar_t * compile_grammar(void) CLERI_NONE, CLERI_MOST_GREEDY, 14, + help_show, + help_noaccess, + help_revoke, help_drop, - help_grant, + help_alter, help_select, - help_list, - help_revoke, - help_show, help_timezones, - help_noaccess, - help_access, + help_count, help_functions, help_create, - help_alter, - help_count, - help_timeit + help_grant, + help_timeit, + help_access, + help_list )) ); cleri_t * START = cleri_sequence( diff --git a/src/siri/parser/listener.c b/src/siri/parser/listener.c index 546f2459..7cab55c5 100644 --- a/src/siri/parser/listener.c +++ b/src/siri/parser/listener.c @@ -44,6 +44,7 @@ #define MAX_ITERATE_COUNT 10000 // ten-thousand +#define SKIP_GET_POINTS -1 #define QP_ADD_SUCCESS qp_add_raw( \ query->packer, (const unsigned char *) "success_msg", 11); @@ -243,6 +244,7 @@ static void exit_list_users(uv_async_t * handle); static void exit_revoke_user(uv_async_t * handle); static void exit_select_aggregate(uv_async_t * handle); static void exit_select_stmt(uv_async_t * handle); +static void exit_series_match(uv_async_t * handle); static void exit_set_address(uv_async_t * handle); static void exit_set_backup_mode(uv_async_t * handle); static void exit_set_drop_threshold(uv_async_t * handle); @@ -261,6 +263,7 @@ static void async_drop_series(uv_async_t * handle); static void async_drop_shards(uv_async_t * handle); static void async_filter_series(uv_async_t * handle); static void async_list_series(uv_async_t * handle); +static void async_no_points_aggregate(uv_async_t * handle); static void async_select_aggregate(uv_async_t * handle); static void async_series_re(uv_async_t * handle); @@ -470,6 +473,7 @@ void siriparser_init_listener(void) siriparser_listen_exit[CLERI_GID_REVOKE_USER] = exit_revoke_user; siriparser_listen_exit[CLERI_GID_SELECT_AGGREGATE] = exit_select_aggregate; siriparser_listen_exit[CLERI_GID_SELECT_STMT] = exit_select_stmt; + siriparser_listen_exit[CLERI_GID_SERIES_MATCH] = exit_series_match; siriparser_listen_exit[CLERI_GID_SET_ADDRESS] = exit_set_address; siriparser_listen_exit[CLERI_GID_SET_BACKUP_MODE] = exit_set_backup_mode; siriparser_listen_exit[CLERI_GID_SET_DROP_THRESHOLD] = exit_set_drop_threshold; @@ -998,6 +1002,7 @@ static void enter_select_stmt(uv_async_t * handle) siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; query_select_t * q_select; cleri_children_t * child; + int skip_get_points; SIRIPARSER_MASTER_CHECK_ACCESS(SIRIDB_ACCESS_SELECT) MASTER_CHECK_ACCESSIBLE(siridb) @@ -1019,18 +1024,22 @@ static void enter_select_stmt(uv_async_t * handle) NULL : imap_new(); /* child is always the ',' and child->next the node */ - child = query->nodes->node->children->next->node->children->next; + child = query->nodes->node->children->next->node->children; + + child = child->next; while (child != NULL) { + if (skip_get_points && !siridb_aggregate_can_skip(child)) + { + skip_get_points = 0; + } q_select->nselects++; child = child->next->next; } - if (q_select->nselects > 1) + if (skip_get_points) { - /* We have more than one select request, let's use points caching. - * (Not critical, everything works if points_map is NULL) */ - q_select->points_map = imap_new(); + q_select->flags |= QUERIES_SKIP_GET_POINTS; } query->free_cb = (uv_close_cb) query_select_free; @@ -3198,6 +3207,27 @@ static void exit_revoke_user(uv_async_t * handle) } } +static void exit_series_match(uv_async_t * handle) +{ + siridb_query_t * query = (siridb_query_t *) handle->data; + query_select_t * q_select = (query_select_t *) query->data; + + if ((q_select->flags & QUERIES_SKIP_GET_POINTS) && + (q_select->start_ts != NULL || q_select->end_ts != NULL)) + { + q_select->flags &= ~QUERIES_SKIP_GET_POINTS; + } + + if ((~q_select->flags & QUERIES_SKIP_GET_POINTS) && q_select->nselects > 1) + { + /* We have more than one select request, let's use points caching. + * (Not critical, everything works if points_map is NULL) */ + q_select->points_map = imap_new(); + } + + SIRIPARSER_ASYNC_NEXT_NODE +} + static void exit_select_aggregate(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; @@ -3312,7 +3342,10 @@ static void exit_select_aggregate(uv_async_t * handle) uv_async_init( siri.loop, next, - (uv_async_cb) async_select_aggregate); + (uv_async_cb) ( + (q_select->flags & QUERIES_SKIP_GET_POINTS) ? + async_no_points_aggregate : + async_select_aggregate)); uv_async_send(next); uv_close((uv_handle_t *) handle, (uv_close_cb) free); @@ -4467,6 +4500,214 @@ static void async_list_series(uv_async_t * handle) } } +static void async_no_points_aggregate(uv_async_t * handle) +{ + siridb_query_t * query = (siridb_query_t *) handle->data; + query_select_t * q_select = (query_select_t *) query->data; + siridb_t * siridb = ((sirinet_socket_t *) query->client->data)->siridb; + uint8_t async_more = 0; + siridb_series_t * series; + siridb_points_t * points; + siridb_points_t * aggr_points; + int required_shard; + + if (q_select->n > siridb->select_points_limit) + { + snprintf(query->err_msg, + SIRIDB_MAX_SIZE_ERR_MSG, + "Query has reached the maximum number of selected points " + "(%u). Please use another time window, an aggregation " + "function or select less series to reduce the number of " + "points.", + siridb->select_points_limit); + + siridb_query_send_error(handle, CPROTO_ERR_QUERY); + return; + } + + uv_mutex_lock(&siridb->series_mutex); + + for (; q_select->slist_index < q_select->slist->len; + ++q_select->slist_index) + { + series = (siridb_series_t *) + q_select->slist->data[q_select->slist_index]; + /* + * We must decrement the ref count immediately since the index is + * incremented by one. The series will not be freed since at least + * 'series_map' still has a reference. + */ + siridb_series_decref(series); + +#if DEBUG + assert (q_select->alist->len >= 1); +#endif + + siridb_aggr_t * aggr = q_select->alist->data[0]; + switch (aggr->gid) + { + case CLERI_GID_F_COUNT: + points = siridb_series_get_count(series); + break; + case CLERI_GID_F_FIRST: + points = siridb_series_get_first(series, &required_shard); + break; + case CLERI_GID_F_LAST: + points = siridb_series_get_last(series, &required_shard); + break; + default: + assert (0); + } + if (points != NULL) + { + for (size_t i = 1; points->len && i < q_select->alist->len; i++) + { + aggr_points = siridb_aggregate_run( + points, + (siridb_aggr_t *) q_select->alist->data[i], + query->err_msg); + + if (aggr_points != points) + { + siridb_points_free(points); + } + + if (aggr_points == NULL) + { + siridb_query_send_error(handle, CPROTO_ERR_QUERY); + return; + } + + points = aggr_points; + } + + q_select->n += points->len; + } + } + + uv_mutex_unlock(&siridb->series_mutex); + + + + + /* We try to read the points from the cache in case a cache is created. + * If there are more select functions left we create a copy of the cache. + * When this is the last select function we pop from the cache since the + * points are no longer required. + */ + points = (q_select->points_map == NULL) ? + NULL : + q_select->nselects ? + siridb_points_copy(imap_get(q_select->points_map, series->id)): + imap_pop(q_select->points_map, series->id); + + if (points == NULL) + { + uv_mutex_lock(&siridb->series_mutex); + + points = (series->flags & SIRIDB_SERIES_IS_DROPPED) ? + NULL : siridb_series_get_points( + series, + q_select->start_ts, + q_select->end_ts); + uv_mutex_unlock(&siridb->series_mutex); + + /* when having a cache and points, add a copy of points to the cache */ + if (q_select->points_map != NULL && points != NULL) + { + siridb_points_t * cpoints = siridb_points_copy(points); + if (cpoints != NULL && + imap_add(q_select->points_map, series->id, cpoints)) + { + siridb_points_free(cpoints); + } + } + } + + if (points != NULL) + { + const char * name; + + for (size_t i = 0; points->len && i < q_select->alist->len; i++) + { + aggr_points = siridb_aggregate_run( + points, + (siridb_aggr_t *) q_select->alist->data[i], + query->err_msg); + + if (aggr_points != points) + { + siridb_points_free(points); + } + + if (aggr_points == NULL) + { + siridb_query_send_error(handle, CPROTO_ERR_QUERY); + return; + } + + points = aggr_points; + } + + q_select->n += points->len; + + if (q_select->merge_as == NULL) + { + name = siridb_presuf_name( + q_select->presuf, + series->name, + series->name_len); + + if (name == NULL || ct_add(q_select->result, name, points)) + { + sprintf(query->err_msg, "Error adding points to map."); + siridb_points_free(points); + log_critical("Critical error adding points"); + siridb_query_send_error(handle, CPROTO_ERR_QUERY); + return; + } + } + else + { + slist_t ** plist; + + name = siridb_presuf_name( + q_select->presuf, + q_select->merge_as, + strlen(q_select->merge_as)); + + plist = (slist_t **) ct_getaddr(q_select->result, name); + + if ( name == NULL || + plist == NULL || + slist_append_safe(plist, points)) + { + sprintf(query->err_msg, "Error adding points to map."); + siridb_points_free(points); + log_critical("Critical error adding points"); + siridb_query_send_error(handle, CPROTO_ERR_QUERY); + return; + } + } + } + + if (async_more) + { + uv_async_send(handle); + } + else + { + siridb_aggregate_list_free(q_select->alist); + q_select->alist = NULL; + + slist_free(q_select->slist); + q_select->slist = NULL; + q_select->slist_index = 0; + + SIRIPARSER_ASYNC_NEXT_NODE + } +} + static void async_select_aggregate(uv_async_t * handle) { siridb_query_t * query = (siridb_query_t *) handle->data; diff --git a/src/siri/parser/queries.c b/src/siri/parser/queries.c index c2930470..101dc7a8 100644 --- a/src/siri/parser/queries.c +++ b/src/siri/parser/queries.c @@ -21,6 +21,7 @@ #define DEFAULT_LIST_LIMIT 1000 #define QUERIES_NEW(q) \ +q->flags = 0; \ q->series_map = NULL; \ q->series_tmp = NULL; \ q->slist = NULL; \ diff --git a/src/test/test.c b/src/test/test.c index 52afabf3..317f45de 100644 --- a/src/test/test.c +++ b/src/test/test.c @@ -711,7 +711,7 @@ static int test_aggr_min(void) static int test_aggr_pvariance(void) { - test_start("Testing pvariance"); + test_start("Testing aggregation pvariance"); siridb_aggr_t aggr; siridb_points_t * result; @@ -769,7 +769,7 @@ static int test_aggr_sum(void) static int test_aggr_variance(void) { - test_start("Testing variance"); + test_start("Testing aggregation variance"); siridb_aggr_t aggr; siridb_points_t * result; @@ -798,7 +798,7 @@ static int test_aggr_variance(void) static int test_aggr_stddev(void) { - test_start("Testing standard deviation"); + test_start("Testing aggregation standard deviation"); siridb_aggr_t aggr; siridb_points_t * result; @@ -825,6 +825,64 @@ static int test_aggr_stddev(void) return test_end(TEST_OK); } +static int test_aggr_first(void) +{ + test_start("Testing aggregation first"); + + siridb_aggr_t aggr; + siridb_points_t * result; + char err_msg[SIRIDB_MAX_SIZE_ERR_MSG]; + siridb_points_t * points = prepare_points(); + + aggr.gid = CLERI_GID_F_FIRST; + aggr.group_by = 5; + aggr.limit = 0; + aggr.offset = 0; + + result = siridb_aggregate_run(points, &aggr, err_msg); + + assert (result != NULL); + assert (result->len == 5); + assert (result->tp == TP_INT); + assert (result->data->ts == 5 && result->data->val.int64 == 1); + assert ((result->data + 2)->ts == 15 && + (result->data + 2)->val.int64 == 4); + + siridb_points_free(result); + siridb_points_free(points); + + return test_end(TEST_OK); +} + +static int test_aggr_last(void) +{ + test_start("Testing aggregation last"); + + siridb_aggr_t aggr; + siridb_points_t * result; + char err_msg[SIRIDB_MAX_SIZE_ERR_MSG]; + siridb_points_t * points = prepare_points(); + + aggr.gid = CLERI_GID_F_LAST; + aggr.group_by = 5; + aggr.limit = 0; + aggr.offset = 0; + + result = siridb_aggregate_run(points, &aggr, err_msg); + + assert (result != NULL); + assert (result->len == 5); + assert (result->tp == TP_INT); + assert (result->data->ts == 5 && result->data->val.int64 == 1); + assert ((result->data + 2)->ts == 15 && + (result->data + 2)->val.int64 == 5); + + siridb_points_free(result); + siridb_points_free(points); + + return test_end(TEST_OK); +} + static int test_iso8601(void) { test_start("Testing iso8601"); @@ -983,6 +1041,8 @@ int run_tests(void) rc += test_aggr_sum(); rc += test_aggr_variance(); rc += test_aggr_stddev(); + rc += test_aggr_first(); + rc += test_aggr_last(); rc += test_iso8601(); rc += test_expr(); rc += test_access();